iT邦幫忙

2024 iThome 鐵人賽

DAY 27
0
DevOps

我獨自升級:從水管工走向 DataOps系列 第 27

【Day 27】Data Pipeline 測試 - Code Quality feat. pytest

  • 分享至 

  • xImage
  •  

前言

感覺 pytest 的前世今生不太重要,已經存在很久了,有寫 python 的應該也都很熟,重點是看看怎麼應用在 Data Pipeline 當中/images/emoticon/emoticon12.gif

專案基本架構

{project root}
├── dags/
├── dbt/
├── plugins/
│ ├── include/
│ ├── operators/
│ └── hooks/
├── tests/
│ ├── conftest.py
│ ├── dags/
│ ├── data/
│ ├── operators/
│ └── hooks/
├── docker-compose.yml
├── Dockerfile
└── requirements.txt

tests 會是獨立一個資料夾,其中會再細分針對 dags/ 或是其他 plugins 做測試,通常會分一個 data 的資料夾,主要是負責測試 data quality 的部分

測試 dag 範例

import pytest
from airflow.models import DagBag
from airflow.utils.dates import days_ago
from airflow.utils.state import State

@pytest.fixture
def dagbag():
    return DagBag()

def test_dag_loaded(dagbag):
    dag = dagbag.get_dag(dag_id="your_dag_id")
    assert dagbag.import_errors == {}
    assert dag is not None

def test_dag_structure(dagbag):
    dag = dagbag.get_dag(dag_id="your_dag_id")
    task_ids = [task.task_id for task in dag.tasks]
    assert "task_1" in task_ids
    assert "task_2" in task_ids

def test_task_dependencies(dagbag):
    dag = dagbag.get_dag(dag_id="your_dag_id")
    task_1 = dag.get_task("task_1")
    task_2 = dag.get_task("task_2")
    assert task_2.upstream_task_ids == {"task_1"}

def test_execute_dag(dagbag):
    dag = dagbag.get_dag(dag_id="your_dag_id")
    execution_date = days_ago(1)
    dag.clear(start_date=execution_date, end_date=execution_date)
    dag_run = dag.create_dagrun(
        run_id=f"test_execute_dag_{execution_date.isoformat()}",
        execution_date=execution_date,
        start_date=execution_date,
        state=State.RUNNING,
        external_trigger=False,
    )
    dag_run.run()
    assert dag_run.state == State.SUCCESS

最基本的會有這幾種測試

  1. test_dag_loaded 測載入是否正常
  2. test_dag_structure 測試 task 是否存在
  3. test_task_dependencies 測試 task 之間的上下游依賴關係
  4. test_execute_dag 會測試 DAG 運行並檢查其執行狀態

@pytest.fixture 如果直翻是「夾具」,其實就是預先定義好的內容,後續就省下許多重複的程式碼,通常也會直接寫在conftest.py 來創建一個 DagBag 對象,這樣我們可以在所有測試中重複使用它。

conftest.py

conftest.py 是 pytest 中的一個特殊文件,用於共享 fixtures、hooks 和其他設定 ,其中定義的東西可以跨多個測試文件,方便重複使用,另外可以針對 scope 設定作用的範圍,例如scope=session 的話就只有當下創建才有,scope=function 就是每個測試函數都會創一個新的 fixture 實例,透過conftest.py可以集中管理共享的 fixtures(夾具?),維護起來比較方便,而且放在裡面之後也不用另外 import,讚/images/emoticon/emoticon07.gif

# conftest.py
import pytest
from airflow.models import DagBag
from airflow.models import Connection
from airflow.utils.db import provide_session

@pytest.fixture(scope="session")
def dagbag():
    return DagBag()

@pytest.fixture(scope="function")
@provide_session
def mock_connection(session=None):
    conn = Connection(
        conn_id="test_conn",
        conn_type="postgres",
        host="localhost",
        login="airflow",
        password="airflow",
        schema="airflow"
    )
    session.add(conn)
    session.commit()
    ...

mock_connection 範例就是在裡面直接訂一個模擬的 db 連接~

pytest + great expectations 測試資料範例

還不知道 great expectations 的話,歡迎回到昨天的【Day 26】Data Pipeline 測試 - Data Quality feat. Great Expectations

import pytest
from great_expectations.data_context import DataContext
from great_expectations.core.batch import BatchRequest

@pytest.fixture(scope="module")
def ge_context():
    return DataContext()

def test_dbt_model_output(ge_context):
    #指定驗證的相關資料
    batch_request = BatchRequest(
        datasource_name="dbt_datasource",
        data_connector_name="default_inferred_data_connector_name",
        data_asset_name="my_dbt_table", 
        batch_identifiers={"default_identifier_name": "default_identifier"}
    )
    
    # 指定要使用的預期檢查
    expectation_suite_name = "my_dbt_table_suite"
    
    # 取得檢查
    validator = ge_context.get_validator(
        batch_request=batch_request, expectation_suite_name=expectation_suite_name
    )
    
    # 檢查和確認結果
    results = validator.validate()
    assert results["success"] is True, "數據未通過 Great Expectations 驗證"

1.DataContext:ge_context()

DataContext 是 Great Expectations 的核心,負責管理整個驗證過程所需的設定

2.BatchRequest

創 BatchRequest,指定要驗證的數據集,包含了數據來源、連接器、後續生成的表my_dbt_table名稱。

3.Expectation Suite

my_dbt_table_suite 是預先定義好的 Data Quality 檢查規則,定義了您對數據的預期,例如欄位範圍、要是唯一值等等。

4.檢查驗證過程

validator(驗證器)使用指定的Expectation Suite 驗證檢查規則對數據進行驗證。validate() 方法執行實際的驗證過程,return 一個包含結果的字典格式。

5.斷言 assert

最後的斷言檢查驗證是否成功。如果失敗,測試將拋出一個帶有錯誤消息的異常。


上一篇
【Day 26】Data Pipeline 測試 - Data Quality feat. Great Expectations
下一篇
【Day 28】Data Pipeline MVP 環境實戰 - .env 和 docker-compose.override.yml
系列文
我獨自升級:從水管工走向 DataOps30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言